1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: MultiplexOutputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44 */
45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.IOException;
48 import java.io.OutputStream;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52
53
54 /***
55 * An <code>OutputStream</code> which multiplexes data over a shared physical
56 * connection, managed by a {@link Multiplexer}.
57 * <p/>
58 * <em>NOTE:</em> the <code>OutputStream</code> methods of this class are not
59 * thread safe
60 *
61 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
62 * @version $Revision: 1.2 $ $Date: 2005/04/02 13:23:12 $
63 * @see Multiplexer
64 */
65 class MultiplexOutputStream extends OutputStream implements Constants {
66
67 /***
68 * The channel identifier, used to associate packets with a channel.
69 */
70 private final int _channelId;
71
72 /***
73 * The packet type.
74 */
75 private byte _type;
76
77 /***
78 * The multiplexer which handles this stream's output.
79 */
80 private Multiplexer _multiplexer;
81
82 /***
83 * The local data buffer.
84 */
85 private byte[] _data;
86
87 /***
88 * The current index into <code>_data</code>.
89 */
90 private int _index;
91
92 /***
93 * The no. of bytes that the remote endpoint can currently accept.
94 */
95 private int _remoteSpace;
96
97 /***
98 * The maximum no. of bytes that the remote endpoint can accept.
99 */
100 private final int _maxRemoteSpace;
101
102 /***
103 * Indicates if the underlying connection has been closed.
104 */
105 private boolean _disconnected;
106
107 /***
108 * Synchronization helper.
109 */
110 private final Object _lock = new Object();
111
112 /***
113 * The logger.
114 */
115 private static final Log _log =
116 LogFactory.getLog(MultiplexOutputStream.class);
117
118
119 /***
120 * Construct a new <code>MultiplexOutputStream</code>.
121 *
122 * @param channelId the channel identifier
123 * @param multiplexer the multiplexer which handles this stream's output
124 * @param size the size of the local data buffer
125 * @param remoteSize the size of the remote endpoint's data buffer
126 */
127 public MultiplexOutputStream(int channelId, Multiplexer multiplexer,
128 int size, int remoteSize) {
129 _channelId = channelId;
130 _multiplexer = multiplexer;
131 _data = new byte[size];
132 _maxRemoteSpace = remoteSize;
133 _remoteSpace = remoteSize;
134 }
135
136 /***
137 * Set the packet type.
138 *
139 * @param type the packet type
140 */
141 public void setType(byte type) {
142 _type = type;
143 }
144
145 /***
146 * This implementation flushes the stream, rather than closing it, as the
147 * stream is re-used.
148 *
149 * @throws IOException if an I/O error occurs
150 */
151 public void close() throws IOException {
152 flush();
153 }
154
155 /***
156 * Flushes this output stream and forces any buffered output bytes to be
157 * written out.
158 *
159 * @throws IOException if an I/O error occurs
160 */
161 public void flush() throws IOException {
162 int offset = 0;
163 int length = _index;
164 while (offset < _index) {
165 int available = waitForSpace();
166 int size = (length <= available) ? length : available;
167
168 send(_data, offset, size);
169 offset += size;
170 length -= size;
171 }
172 _index = 0;
173 }
174
175 /***
176 * Writes length bytes from the specified byte array starting at offset to
177 * this output stream.
178 *
179 * @param buffer the data to write
180 * @param offset the start offset in the data
181 * @param length the number of bytes to write
182 * @throws IOException if an I/O error occurs
183 */
184 public void write(byte[] buffer, int offset, int length)
185 throws IOException {
186
187 int space = _data.length - _index;
188 if (space >= length) {
189
190 System.arraycopy(buffer, offset, _data, _index, length);
191 _index += length;
192 } else {
193 flush();
194 int size = length;
195
196 while (size > 0) {
197 int available = waitForSpace();
198 int count = (size <= available) ? size : available;
199 send(buffer, offset, count);
200 offset += count;
201 size -= count;
202 }
203 }
204 }
205
206 /***
207 * Writes the specified byte to this output stream.
208 *
209 * @param value the byte value
210 * @throws IOException if an I/O error occurs
211 */
212 public void write(int value) throws IOException {
213 if (_index >= _data.length) {
214 flush();
215 }
216 _data[_index++] = (byte) value;
217 }
218
219 /***
220 * Notify this of the no. of bytes read by the remote endpoint.
221 *
222 * @param read the number of bytes read
223 * @throws IOException if the no. of bytes exceeds that expected
224 */
225 public void notifyRead(int read) throws IOException {
226 synchronized (_lock) {
227 int space = _remoteSpace + read;
228 if (space > _maxRemoteSpace) {
229 throw new IOException("Remote space=" + space
230 + " exceeds expected space="
231 + _maxRemoteSpace);
232 }
233 _remoteSpace = space;
234
235 if (_log.isDebugEnabled()) {
236 _log.debug("notifyRead(read=" + read
237 + ") [channelId=" + _channelId
238 + ", remoteSpace=" + _remoteSpace
239 + "]");
240 }
241 _lock.notifyAll();
242 }
243 }
244
245 /***
246 * Invoked when the underlying physical connection is closed.
247 */
248 public void disconnected() {
249 synchronized (_lock) {
250 _disconnected = true;
251 _lock.notifyAll();
252 }
253 }
254
255 /***
256 * Returns a string representation of this.
257 *
258 * @return a string representation of this
259 */
260 public String toString() {
261 return "MultiplexOutputStream[index=" + _index + "]";
262 }
263
264 /***
265 * Sends length bytes from the specified byte array starting at offset to
266 * the endpoint.
267 *
268 * @param buffer the data to write
269 * @param offset the start offset in the data
270 * @param length the number of bytes to write
271 * @throws IOException if an I/O error occurs
272 */
273 private void send(byte[] buffer, int offset, int length)
274 throws IOException {
275 if (_log.isDebugEnabled()) {
276 _log.debug("send(length=" + length + ") [channelId=" + _channelId
277 + ", remoteSpace=" + _remoteSpace
278 + "]");
279 }
280 synchronized (_lock) {
281 _multiplexer.send(_type, _channelId, buffer, offset, length);
282 _type = DATA;
283
284 _remoteSpace -= length;
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301 }
302 }
303
304 /***
305 * Returns immediately if the endpoint can receive data, otherwise blocks,
306 * waiting for the endpoint to have space available.
307 *
308 * @return the number of bytes that the endpoint can accept
309 * @throws IOException if the connection is closed while blocking
310 */
311 private int waitForSpace() throws IOException {
312 int available = 0;
313 while (!_disconnected) {
314 synchronized (_lock) {
315 if (_log.isDebugEnabled()) {
316 _log.debug("waitForSpace() [channelId=" + _channelId
317 + ", remoteSpace=" + _remoteSpace
318 + "]");
319 }
320
321 if (_remoteSpace > 0) {
322 available = _remoteSpace;
323 break;
324 } else {
325 try {
326 _lock.wait();
327 } catch (InterruptedException ignore) {
328 }
329 }
330 }
331 }
332 if (_disconnected) {
333 throw new IOException("Connection has been closed");
334 }
335
336 return available;
337 }
338
339 }